package top.lucky.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import top.lucky.kafka.utils.PropertiesUtil;

import java.util.Properties;

/**
 * @author Jane
 * @date 2022/12/28, 11:29
 */
public class KafkaProducerTransactions {
	
	public static void main(String[] args) {
		Properties properties = PropertiesUtil.getProperties();
		properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_1");
		KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
		kafkaProducer.initTransactions();//初始化事务
		kafkaProducer.beginTransaction();
		
		try {
			for (int i = 0; i < 5; i++) {
				kafkaProducer.send(new ProducerRecord<>("first", "mmmmmm" + i));
			}
			//int i = 100/0;
			kafkaProducer.commitTransaction();
		} catch (Exception e) {
			kafkaProducer.abortTransaction();//回滚
		}finally {
			kafkaProducer.close();
		}
	}
}
